Reactive Spring by Josh Long

Reactive Spring by Josh Long

Author:Josh Long
Language: eng
Format: mobi
Tags: Spring, Spring Boot, Reactor, Microservices
Published: 2020-09-13T00:00:00+00:00


When a client sends a message in, we adapt it to a Message object. Message instances store a client ID, the text of the message itself, and a timestamp.

package rsb.ws.chat; import lombok.Data; import lombok.RequiredArgsConstructor; import java.util.Date; @Data @RequiredArgsConstructor class Message { private final String clientId; private final String text; private final Date when; }

The bulk of the chat implementation lives in ChatWebsocketConfiguration.

package rsb.ws.chat; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.SneakyThrows; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.reactive.HandlerMapping; import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; import org.springframework.web.reactive.socket.WebSocketHandler; import org.springframework.web.reactive.socket.WebSocketMessage; import reactor.core.publisher.Flux; import reactor.core.publisher.SignalType; import java.util.Date; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @Configuration class ChatWebsocketConfiguration { ① ChatWebsocketConfiguration(ObjectMapper objectMapper) { this.objectMapper = objectMapper; } ② private final Map<String, Connection> sessions = new ConcurrentHashMap<>(); ③ private final BlockingQueue<Message> messages = new LinkedBlockingQueue<>(); private final ObjectMapper objectMapper; @Bean WebSocketHandler chatWsh() { ④ var messagesToBroadcast = Flux.<Message>create(sink -> { var submit = Executors.newSingleThreadExecutor().submit(() -> { while (true) { try { sink.next(this.messages.take()); } catch (InterruptedException e) { throw new RuntimeException(e); } } }); sink.onCancel(() -> submit.cancel(true)); }) // .share(); return session -> { ⑤ var sessionId = session.getId(); this.sessions.put(sessionId, new Connection(sessionId, session)); var in = session ⑥ .receive() // .map(WebSocketMessage::getPayloadAsText) // .map(this::messageFromJson) // .map(msg -> new Message(sessionId, msg.getText(), new Date())) // .map(this.messages::offer)// .doFinally(st -> { ⑦ if (st.equals(SignalType.ON_COMPLETE)) {// this.sessions.remove(sessionId);// } }); // var out = messagesToBroadcast ⑧ .map(this::jsonFromMessage)// .map(session::textMessage); return session.send(out).and(in); }; } ⑨ @SneakyThrows private Message messageFromJson(String json) { return this.objectMapper.readValue(json, Message.class); } @SneakyThrows private String jsonFromMessage(Message msg) { return this.objectMapper.writeValueAsString(msg); } @Bean HandlerMapping chatHm() { return new SimpleUrlHandlerMapping() { { this.setUrlMap(Map.of("/ws/chat", chatWsh())); this.setOrder(2); } }; } }



Download



Copyright Disclaimer:
This site does not store any files on its server. We only index and link to content provided by other sites. Please contact the content providers to delete copyright contents if any and email us, we'll remove relevant links or contents immediately.